package com.qsl.listener;

import com.qsl.support.Delivery;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.PossibleAuthenticationFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author: 青石路
 */
public class SimpleMessageListenerContainer {

    private static final Logger logger = LoggerFactory.getLogger(SimpleMessageListenerContainer.class);

    protected final Object consumersMonitor = new Object();

    private volatile int concurrentConsumers = 1;
    private Set<BlockingQueueConsumer> consumers;
    private int batchSize = 1;
    private volatile int prefetchCount = 250;
    private volatile boolean isAutoAck = false;
    private volatile ConnectionFactory connectionFactory;
    private volatile List<String> queues;

    public SimpleMessageListenerContainer() {
    }

    public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        setConnectionFactory(connectionFactory);
    }

    public void start() {
        synchronized (this.consumersMonitor) {
            if (this.consumers != null) {
                throw new IllegalStateException("A stopped container should not have consumers");
            }
            int newConsumers = initializeConsumers();
            if (this.consumers == null) {
                logger.info("Consumers were initialized and then cleared " +
                        "(presumably the container was stopped concurrently)");
                return;
            }
            if (newConsumers <= 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Consumers are already running");
                }
                return;
            }
            Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
            for (BlockingQueueConsumer consumer : this.consumers) {
                AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                processors.add(processor);
                new Thread(processor).start();
            }
            waitForConsumersToStart(processors);
        }
    }

    public void shutdown() {
        synchronized (this.consumersMonitor) {
            if (this.consumers != null) {
                Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
                while (consumerIterator.hasNext()) {
                    BlockingQueueConsumer consumer = consumerIterator.next();
                    consumer.stop();
                    consumerIterator.remove();
                }
                this.consumers = null;
            }
            else {
                logger.info("Shutdown ignored - container is already stopped");
            }
        }
    }

    private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> processors) {
        for (AsyncMessageProcessingConsumer processor : processors) {
            try {
                processor.ready();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    public void setConcurrentConsumers(int concurrentConsumers) {
        this.concurrentConsumers = concurrentConsumers;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    public void setPrefetchCount(int prefetchCount) {
        this.prefetchCount = prefetchCount;
    }

    public void setAutoAck(boolean autoAck) {
        isAutoAck = autoAck;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public void setQueues(List<String> queues) {
        this.queues = queues;
    }

    protected int initializeConsumers() {
        int count = 0;
        synchronized (this.consumersMonitor) {
            if (this.consumers == null) {
                this.consumers = new HashSet<>(this.concurrentConsumers);
                for (int i = 0; i < this.concurrentConsumers; i++) {
                    BlockingQueueConsumer consumer = createBlockingQueueConsumer();
                    this.consumers.add(consumer);
                    count++;
                }
            }
        }
        return count;
    }

    private boolean isActive(BlockingQueueConsumer consumer) {
        return this.consumers != null && this.consumers.contains(consumer);
    }

    protected BlockingQueueConsumer createBlockingQueueConsumer() {
        BlockingQueueConsumer consumer;
        // There's no point prefetching less than the tx size, otherwise the consumer will stall because the broker
        // didn't get an ack for delivered messages
        int actualPrefetchCount = Math.max(prefetchCount, this.batchSize);
        String[] queueArray = queues.toArray(new String[0]);
        consumer = new BlockingQueueConsumer(connectionFactory, actualPrefetchCount, isAutoAck, queueArray);
        return consumer;
    }

    private void restart(BlockingQueueConsumer oldConsumer) {
        BlockingQueueConsumer consumer = oldConsumer;
        synchronized (this.consumersMonitor) {
            if (this.consumers != null) {
                try {
                    // Need to recycle the channel in this consumer
                    consumer.stop();
                    // Ensure consumer counts are correct (another is going
                    // to start because of the exception, but
                    // we haven't counted down yet)
                    this.consumers.remove(consumer);
                    BlockingQueueConsumer newConsumer = createBlockingQueueConsumer();
                    consumer = newConsumer;
                    this.consumers.add(consumer);
                }
                catch (RuntimeException e) {
                    logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage());
                    // Re-throw and have it logged properly by the caller.
                    throw e;
                }
                new Thread(new AsyncMessageProcessingConsumer(consumer)).start();
            }
        }
    }

    private final class AsyncMessageProcessingConsumer implements Runnable {

        private final BlockingQueueConsumer consumer;
        private final CountDownLatch start;

        AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
            this.consumer = consumer;
            this.start = new CountDownLatch(1);
        }

        private void ready() throws InterruptedException {
            if (!this.start.await(60000, TimeUnit.MILLISECONDS)) {
                logger.error("Consumer failed to start in "
                        + 60000
                        + " milliseconds; does the task executor have enough threads to support the container "
                        + "concurrency?");
                throw new RuntimeException("Consumer failed to start in 60000 milliseconds");
            }
        }

        @Override
        public void run() {
            boolean aborted = false;
            try {
                initialize();
                while (isActive(this.consumer) || this.consumer.hasDelivery()) {
                    mainLoop();
                }
            } catch (InterruptedException e) {
                logger.debug("Consumer thread interrupted, processing stopped.");
                Thread.currentThread().interrupt();
                aborted = true;
            } catch (PossibleAuthenticationFailureException ex) {
                logger.error("Consumer received fatal exception during processing", ex);
                aborted = true;
            } catch (Error e) { //NOSONAR
                // ok to catch Error - we're aborting so will stop
                logger.error("Consumer thread error, thread abort.", e);
                aborted = true;
            } catch (Throwable t) { //NOSONAR
                // by now, it must be an exception
                logger.error("Consumer raised exception, attempting restart", t);
            }
            // In all cases count down to allow container to progress beyond startup
            this.start.countDown();
            killOrRestart(aborted);
        }

        private void mainLoop() throws Exception {
            Channel channel = consumer.getChannel();
            Delivery delivery = consumer.nextMessage(1000);
            if (delivery != null) {
                // 找到合适的 listener，执行业务
                executeListener(channel, delivery);
            }
        }

        private void initialize() throws Throwable {
            try {
                this.consumer.start();
                this.start.countDown();
            } catch (Throwable t) {
                this.start.countDown();
                throw t;
            }

        }

        private void killOrRestart(boolean aborted) {
            if (!isActive(this.consumer) || aborted) {
                logger.debug("Cancelling " + this.consumer);
                this.consumer.stop();
                SimpleMessageListenerContainer.this.consumers.remove(consumer);
            }
            else {
                logger.info("Restarting " + this.consumer);
                restart(this.consumer);
            }
        }
    }

    private void executeListener(Channel channel, Delivery delivery) {
        logger.info("收到队列[{}]的消息[{}]", delivery.getQueue(), new String(delivery.getBody(), StandardCharsets.UTF_8));
        logger.info("匹配合适的 listener，通过反射invoke");
        // TODO 匹配合适的 listener
    }
}
